package com.atguigu.flink.chapter11;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lzc
 * @Date 2022/7/12 11:12
 */
public class Flink01_Table_Base_Use_3 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1L, 10),
            new WaterSensor("sensor_1", 2L, 20),
            new WaterSensor("sensor_2", 3L, 30),
            new WaterSensor("sensor_1", 4L, 40),
            new WaterSensor("sensor_2", 5L, 50),
            new WaterSensor("sensor_1", 6L, 60)
        );
        
        
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        Table table = tEnv.fromDataStream(stream);  // 把流转成一个动态表
        //        table.printSchema();
        
        // select id, sum(vc) vc_sum from t group by id
        Table result = table
            .groupBy($("id"))
            .aggregate($("vc").sum().as("vc_sum"))
            .select($("id"), $("vc_sum"));
        
        
        // 把result表转成流
        DataStream<Tuple2<Boolean, Row>> resultStream = tEnv.toRetractStream(result, Row.class);
        resultStream
            .filter(t -> t.f0)
            .map(t -> t.f1)
            .print();
        
        env.execute();  // 如何有流的一些操作, 需要执行, 否则不需要
    }
}
